package com.leilei;

import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

/**
 * @author lei
 * @version 1.0
 * @date 2021/3/14 15:01
 * @desc 自定义sink redis
 */
public class FlinkSink4_Redis {
    public static void main(String[] args) throws Exception {
        // 准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        // 加载数据源
        DataStreamSource<VehicleAlarm> streamSource = env.addSource(new MySource());
        // todo 数据处理

        // 数据输出 输出到redis
        streamSource.addSink(new MyRedisSink());
        // 程序执行
        env.execute("redis-sink");

    }


    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class VehicleAlarm {
        private String id;
        private String licensePlate;
        private String plateColor;
        private Long deviceTime;
        private String zone;
    }

    /**
     * 自定义数据源
     */
    public static class MySource implements SourceFunction<VehicleAlarm> {
        @Override
        public void run(SourceContext<VehicleAlarm> ctx) throws Exception {
            while (true) {
                long id = System.currentTimeMillis() / 10000;
                VehicleAlarm vehicleAlarm = new VehicleAlarm(String.valueOf(id), "川B" + id,
                        "蓝", System.currentTimeMillis(), "sc");
                ctx.collect(vehicleAlarm);
                Thread.sleep(10000);
            }
        }

        @Override
        public void cancel() {
        }
    }

    /**
     * 自定义redis-sink
     */
    public static class MyRedisSink extends RichSinkFunction<VehicleAlarm> {
        private transient static JedisPool jedisPool;
        private transient Jedis jedis = null;

        static {
            //初始化jedis pool
            JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
            jedisPool = new JedisPool(jedisPoolConfig,
                    "xx",
                    6379,
                    3000,
                    "xx",
                    2);
        }

        /**
         * 开启redis连接池，当前sink创建到销毁 close操作只会执行一次
         * @param parameters
         * @throws Exception
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            while (jedis == null) {
                jedis = jedisPool.getResource();
                Thread.sleep(1000);
            }
        }

        /**
         * 执行输出操作 每一个数据输出都会执行此方法
         * @param value
         * @param context
         * @throws Exception
         */
        @Override
        public void invoke(VehicleAlarm value, Context context) throws Exception {
            //自定义 存值操作，我这里以车牌+区域ID作为可以，Value 为JSON字符串
            jedis.set(value.getLicensePlate() + value.getZone(), JSON.toJSONString(value));
        }

        /**
         * 关闭redis连接池，当前sink创建到销毁 close操作只会执行一次
         * @throws Exception
         */
        @Override
        public void close() throws Exception {
            super.close();
            if (jedis != null) {
                jedis.close();
            }
            if (jedisPool != null) {
                jedisPool.close();
            }
        }
    }

}
